use actix::prelude::*;
use chrono::{Local, Utc, TimeZone};
use serde_json::json;
use crate::{kv::KVStore, DbPool};
use crate::{kv, actions, models};

use tokio::net::UnixStream;
use tonic::transport::{Endpoint, Uri};
use tower::service_fn;
use tokio_stream::StreamExt;

use std::fmt::Display;
use std::time::Duration;
use crossbeam_channel::tick;

pub mod falco_version {
    tonic::include_proto!("falco.version");
}

pub mod falco_outputs {
    tonic::include_proto!("falco.outputs");
}



// Define actor
pub struct FalcoScheduler {
    db: kv::RocksDB,
    pool: DbPool,
}

// Provide Actor implementation for our actor
impl Actor for FalcoScheduler {
    type Context = Context<Self>;

    fn started(&mut self, _ctx: &mut Self::Context) {
        println!("FalcoScheduler Actor is started");
        let arbiter = Arbiter::new();
        arbiter.spawn(FalcoScheduler::execute(self.db.clone(), self.pool.clone()));
    }

    // fn started(&mut self, ctx: &mut Self::Context) {
    //     println!("FalcoScheduler Actor is started");
    //     // TODO 此处内存泄露，原因尚不明确，猜测：反复调用clone，反复创建Arbiter，run_interval没有释放空间。改用死循环控制
    //     ctx.run_interval(Duration::from_millis(1000), |this, _ctx| {
    //         let arbiter = Arbiter::new();
    //         arbiter.spawn(FalcoScheduler::execute(this.db.clone()));
    //     });
    // }

    fn stopped(&mut self, _ctx: &mut Context<Self>) {
        println!("FalcoScheduler Actor is stopped");
    }
}

impl Display for falco_outputs::Priority {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "{:?}", self)
    }
}

impl FalcoScheduler {

    #[allow(unused)]
    pub(crate) fn get_scheduler(db: kv::RocksDB, pool: DbPool) -> Self {
        FalcoScheduler { db, pool }
    }

    async fn execute(db: kv::RocksDB, pool: DbPool) {

        let conn = pool.get().unwrap();

        let ticker = tick(Duration::from_millis(1000));
        loop {
            let channel_result = Endpoint::try_from("http://[::]:50051").unwrap()
            .connect_with_connector(service_fn(|_: Uri| {
                // Connect to a Uds socket
                UnixStream::connect("/var/run/falco.sock")
            }))
            .await;
            match channel_result {
                Ok(channel) => {
                    let mut falco_outputs_service_client = falco_outputs::service_client::ServiceClient::new(channel);
                    loop {
                        let _ = ticker.recv().unwrap();

                        let response_result = falco_outputs_service_client.get(falco_outputs::Request{}).await;
                        if response_result.is_err() {
                            println!("{:?}", response_result.err());
                            continue;
                        }
                        let response = response_result.unwrap();
                        let mut resp_stream = response.into_inner();
                        while let Some(received) = resp_stream.next().await {
                            
                            let received = received.unwrap();

                            let (seconds, nanos) = match &received.time {
                                Some(time) => (time.seconds, time.nanos),
                                None => (0, 0),
                            };
                
                            let alert_json = json!({
                                "title": received.rule,
                                "event_time": seconds as u64,
                                "event_type": received.source,
                                "fields": received.output_fields
                            });

                            let timestamp = Utc.timestamp(seconds, nanos as u32);
                
                            let key = Local::now().format("%Y%m%d%H%M%S%f").to_string();
                            // println!("FalcoScheduler resp_stream.next().await {}", key);
                            let doc_json = serde_json::to_string(&alert_json).unwrap();

                            // save to rocksdb
                            db.save(&key, &doc_json);

                            // save to sqlite
                            let _db_result = actions::insert_new_alert(models::Alert {
                                id: key,
                                severity: received.priority().to_string(),
                                category: "异常行为".to_string(),
                                description: received.rule,
                                time: timestamp.timestamp_millis(),
                                source: "内核调用".to_string(),
                                mal_obj_type: "".to_string(),
                                mal_obj_key: "".to_string(),
                                mal_obj_country: "".to_string(),
                                mal_obj_city: "".to_string(),
                                rule_id: "F-1".to_string(),
                                solved: false,
                            }, &conn);

                            //println!("{:?}", db_result);
                        }
                    }
                },
                Err(e) => {
                    println!("{}", e);
                    let _ = ticker.recv().unwrap();
                },
            }
        }
        
        // let channel = Endpoint::try_from("http://[::]:50051").unwrap()
        //     .connect_with_connector(service_fn(|_: Uri| {
        //         // Connect to a Uds socket
        //         UnixStream::connect("/var/run/falco.sock")
        //     }))
        //     .await.unwrap();
        // let mut falco_outputs_service_client = falco_outputs::service_client::ServiceClient::new(channel);
        
        
        
    }

    // async fn execute(db: kv::RocksDB) {
        
    //     let channel = Endpoint::try_from("http://[::]:50051").unwrap()
    //         .connect_with_connector(service_fn(|_: Uri| {
    //             // Connect to a Uds socket
    //             UnixStream::connect("/var/run/falco.sock")
    //         }))
    //         .await.unwrap();
    //     let mut falco_outputs_service_client = falco_outputs::service_client::ServiceClient::new(channel);
    //     let response = falco_outputs_service_client.get(falco_outputs::Request{}).await.unwrap();
    //     let mut resp_stream = response.into_inner();
    //     while let Some(received) = resp_stream.next().await {
            
    //         let received = received.unwrap();

    //         let alert_json = json!({
    //             "title": received.rule,
    //             "event_time": received.time.unwrap().seconds as u64,
    //             "event_type": received.source,
    //             "fields": received.output_fields
    //         });

    //         let key = Local::now().format("%Y%m%d%H%M%S%f").to_string();
    //         println!("FalcoScheduler resp_stream.next().await {}", key);
    //         let doc_json = serde_json::to_string(&alert_json).unwrap();
    //         db.save(key, doc_json);
    //     }
    // }


}